From 09addb7aaa2a13fcb4727a7066d9d563ae508149 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 28 Jul 2014 21:23:48 -0700 Subject: [PATCH] Power the TaskPool with a sync channel This rate limits the jobs entering the pool to ensure that the number of pending jobs is actually close to the number of cores. The current behavior is to print all pending jobs, even when they're not executing, which may be confusing. --- src/cargo/util/pool.rs | 47 +++++++++++++----------------------------- 1 file changed, 14 insertions(+), 33 deletions(-) diff --git a/src/cargo/util/pool.rs b/src/cargo/util/pool.rs index 2b3697a41..1dc2d7345 100644 --- a/src/cargo/util/pool.rs +++ b/src/cargo/util/pool.rs @@ -8,54 +8,35 @@ use std::sync::{Arc, Mutex}; pub struct TaskPool { - state: Arc>, + tx: SyncSender, } -struct State { done: bool, jobs: Vec } - impl TaskPool { pub fn new(tasks: uint) -> TaskPool { assert!(tasks > 0); + let (tx, rx) = sync_channel(tasks); - let state = Arc::new(Mutex::new(State { - done: false, - jobs: Vec::new(), - })); + let state = Arc::new(Mutex::new(rx)); for _ in range(0, tasks) { - let myjobs = state.clone(); - spawn(proc() worker(&*myjobs)); + let state = state.clone(); + spawn(proc() worker(&*state)); } - return TaskPool { state: state }; - - fn worker(mystate: &Mutex) { - let mut state = mystate.lock(); - while !state.done { - match state.jobs.pop() { - Some(job) => { - drop(state); - job(); - state = mystate.lock(); - } - None => state.cond.wait(), + return TaskPool { tx: tx }; + + fn worker(rx: &Mutex>) { + loop { + let job = rx.lock().recv_opt(); + match job { + Ok(job) => job(), + Err(..) => break, } } } } pub fn execute(&self, job: proc():Send) { - let mut state = self.state.lock(); - state.jobs.push(job); - state.cond.signal(); - } -} - -impl Drop for TaskPool { - fn drop(&mut self) { - let mut state = self.state.lock(); - state.done = true; - state.cond.broadcast(); - drop(state); + self.tx.send(job); } } -- 2.30.2